package com.hanxiaozhang.test;

import com.hanxiaozhang.constant.RocketConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2023/8/14
 * @since 1.0.0
 */
@Slf4j
public class ConsumerConcurrently {

    public static void main(String[] args) throws Exception {

        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketConstant.TEST_1_CONSUMER_GROUP);
        // 设置NameServer的地址
        consumer.setNamesrvAddr(RocketConstant.NAME_SERVER_ADDR);
        // 订阅一个或多个Topic，用Tag来过滤需要消费的消息，这里指定*表示接收所有Tag的消息
        consumer.subscribe(RocketConstant.TEST_1_TOPIC, "*");
        // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeConcurrentlyContext context) {
                log.info("{}收到消息：{}", this.getClass().getSimpleName(), messageExts);
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
         // 启动消费者实例
        consumer.start();
        log.info("消费者启动成功!");
    }
}
